package cn.doitedu.rtmk.demo.demo3;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.Map;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/2/9
 * @Desc: 学大数据，到多易教育
 * <p>
 * 事件驱动型的基于规则的自动化实时营销系统DEMO
 * -- 用户行为日志测试数据
 * {"userId":1,"eventId":"event1","properties":{},"timeStamp":1675935310000}
 * {"userId":1,"eventId":"add_cart","properties":{"item_price":150},"timeStamp":1675935311000}
 * {"userId":1,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935312000}
 * {"userId":1,"eventId":"item_like","properties":{"item_price":150},"timeStamp":1675935313000}
 * {"userId":1,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935314000}
 * {"userId":1,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935315000}
 * {"userId":1,"eventId":"add_cart","properties":{"item_price":150},"timeStamp":1675935316000}
 * {"userId":2,"eventId":"event1","properties":{},"timeStamp":1675935310000}
 * {"userId":2,"eventId":"add_cart","properties":{"item_price":150},"timeStamp":1675935311000}
 * {"userId":2,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935312000}
 * {"userId":2,"eventId":"item_like","properties":{"item_price":150},"timeStamp":1675935313000}
 * {"userId":2,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935314000}
 * {"userId":2,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935315000}
 * {"userId":2,"eventId":"add_cart","properties":{"item_price":150},"timeStamp":1675935316000}
 * <p>
 * -- 规则参数测试数据
 * {"fireEventId":"add_cart","fileEventPropertyKey":"item_price","fileEventPropertyValue":100,"ageConditionMin":20,"ageConditionMax":40,"genderCondition":"male","tg03Condition":1000,"dynamicConditionValue1":2,"dynamicConditionvalue2":0}
 **/
public class Demo3 {
    public static void main(String[] args) throws Exception {
        // 构建编程环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000);
        env.setParallelism(1);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/ckpt");


        // 读 kafka中的日志明细数据,接收用户的行为
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("doitedu:9092")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setGroupId("gpac01" + System.currentTimeMillis())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setTopics("rtmk-test-data")
                .build();
        DataStreamSource<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "s");
        SingleOutputStreamOperator<EventBean> beans = ds.map(json -> JSON.parseObject(json, EventBean.class));


        // 读取kafka中的规则参数
        KafkaSource<String> paramDataSource = KafkaSource.<String>builder()
                .setBootstrapServers("doitedu:9092")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setGroupId("gpac01" + System.currentTimeMillis())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setTopics("rtmk-rule-param")
                .build();
        DataStreamSource<String> paramData = env.fromSource(paramDataSource, WatermarkStrategy.noWatermarks(), "s");

        MapStateDescriptor<String, String> bcStateDesc = new MapStateDescriptor<>("param_bc_state", String.class, String.class);
        BroadcastStream<String> paramBc = paramData.broadcast(bcStateDesc);

        SingleOutputStreamOperator<RtmkMessage> res =
                beans.keyBy(EventBean::getUserId)
                        .connect(paramBc)
                        .process(new KeyedBroadcastProcessFunction<Integer, EventBean, String, RtmkMessage>() {

                            Table table;
                            ListState<EventBean> con1State;
                            ListState<EventBean> con2State;
                            RuleModel001Calculator rule001_Calculator;

                            ValueState<Integer> rule2_state;
                            RuleModel002Calculator rule002_Calculator;

                            @Override
                            public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
                                Configuration config = HBaseConfiguration.create();
                                config.set("hbase.zookeeper.quorum", "doitedu:2181");
                                Connection conn = ConnectionFactory.createConnection(config);
                                table = conn.getTable(TableName.valueOf("user_profile"));


                                StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(5)).useProcessingTime().build();
                                ListStateDescriptor<EventBean> con1StateDesc =
                                        new ListStateDescriptor<>("con1_state", EventBean.class);
                                con1StateDesc.enableTimeToLive(ttlConfig);
                                con1State = getRuntimeContext().getListState(con1StateDesc);


                                StateTtlConfig ttlConfig2 = StateTtlConfig.newBuilder(Time.minutes(30)).useProcessingTime().build();
                                ListStateDescriptor<EventBean> con2StateDesc =
                                        new ListStateDescriptor<>("con2_state", EventBean.class);
                                con1StateDesc.enableTimeToLive(ttlConfig2);
                                con2State = getRuntimeContext().getListState(con2StateDesc);


                                rule2_state = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("rule2_state", Integer.class));

                                rule001_Calculator = new RuleModel001Calculator();
                                rule002_Calculator = new RuleModel002Calculator();

                            }

                            @Override
                            public void processElement(EventBean eventBean, KeyedBroadcastProcessFunction<Integer, EventBean, String, RtmkMessage>.ReadOnlyContext ctx, Collector<RtmkMessage> out) throws Exception {
                                ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(bcStateDesc);
                                // 从广播状态中，取出规则参数对象
                                String rule001_param = broadcastState.get("001_001");
                                String rule002_param = broadcastState.get("002_001");

                                if (rule001_param != null) {
                                    rule001_Calculator.init(rule001_param);
                                    rule001_Calculator.calc(eventBean, con1State, con2State, table, out);
                                }

                                if (rule002_param != null) {
                                    rule002_Calculator.init(rule002_param);
                                    rule002_Calculator.calc(eventBean, rule2_state, out);
                                }

                            }

                            @Override
                            public void processBroadcastElement(String paramJson, KeyedBroadcastProcessFunction<Integer, EventBean, String, RtmkMessage>.Context ctx, Collector<RtmkMessage> out) throws Exception {
                                JSONObject jsonObject = JSON.parseObject(paramJson);
                                String ruleId = jsonObject.getString("ruleId");
                                System.out.println("收到规则参数，规则id为： " + ruleId);
                                // 收到广播流的参数信息，则将其放入 广播状态
                                BroadcastState<String, String> broadcastState = ctx.getBroadcastState(bcStateDesc);
                                broadcastState.put(ruleId, paramJson);

                            }
                        });


        res.print();

        env.execute();


    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Setter
    public static class EventBean {
        int userId;
        String eventId;
        Map<String, String> properties;
        long timeStamp;
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Setter
    public static class RtmkMessage {
        int userId;
        long timeStamp;
        String ruleId;
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Setter
    public static class RuleParamBean {
        String fireEventId;
        String fileEventPropertyKey;
        int fileEventPropertyValue;

        int ageConditionMin;
        int ageConditionMax;
        String genderCondition;
        int tg03Condition;

        int dynamicConditionValue1;
        int dynamicConditionvalue2;
    }

}
